package com.xinyue.game.logic.frame;

import javax.annotation.PostConstruct;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;

import com.xinyue.game.logic.frame.handler.HandlerMethod;
import com.xinyue.network.message.stream.AbstractMessageHeader;
import com.xinyue.network.message.stream.IGameMessage;
import com.xinyue.network.message.stream.ServiceMessageHeader;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

@RocketMQMessageListener(topic = "${spring.cloud.nacos.discovery.namespace}-XinyueBusinessTopic-${xinyue.game.common.config.local-server-id}",
consumeMode = ConsumeMode.ORDERLY, consumerGroup = "${spring.cloud.nacos.discovery.namespace}-BusinessConsumeGroup-${xinyue.game.common.config.local-server-id}")
public class GatewayMessageReceiveService implements RocketMQListener<MessageExt> {

    private static Logger logger = LoggerFactory.getLogger(GatewayMessageReceiveService.class);
    @Autowired
    private GameRequestHandlerContext handlcoContext;
    @Autowired
    private Environment environment;
    private String topic;

    @PostConstruct
    public void init() {
        RocketMQMessageListener messageListener = this.getClass().getAnnotation(RocketMQMessageListener.class);
         topic = environment.resolveRequiredPlaceholders(messageListener.topic());
        String consumerGroup = environment.resolveRequiredPlaceholders(messageListener.consumerGroup());
        
        logger.info("监听网关消息，topic:{},group:{}", topic, consumerGroup);


    }
    
    public String getTopic() {
        return topic;
    }


    @Override
    public void onMessage(MessageExt message) {
        if (!XinyueGameServerBoot.isStarted()) {
            logger.warn("服务未启动成功，收到消息暂不处理");
            return;
        }
        IGameMessage gameMessage;
        try {
            gameMessage = this.getGameMessage(message.getBody());
            if (gameMessage == null) {
                return;
            }
            ServiceMessageHeader header = (ServiceMessageHeader) gameMessage.getHeader();
            HandlerMethod handlerMethod = this.handlcoContext.getHandlerMethod(header.getMessageId());
            XinyueGameServerBoot.serverContext.getGameChannelGroup().gameChannelConsumer(header.getPlayerId(), gameChannel -> {
                gameChannel.submitTask(() -> {
                    try {
                        logger.debug("=>{}", header);
                        GameServerContext gameServerContext = new GameServerContext(header,gameChannel);
                        handlerMethod.getTargetMethod().invoke(handlerMethod.getTargetObj(), gameMessage, gameServerContext);
                    } catch (Throwable e) {
                        logger.error("网关消息处理出错", e);
                    }
                }, "处理网关消息");
            });

        } catch (Exception e) {
            logger.error("解析网关数据出错", e);
        }
    }

    private IGameMessage getGameMessage(byte[] data) throws Exception {
        ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
		int messageId = AbstractMessageHeader.readMessageId(byteBuf);
        IGameMessage gameMessage = handlcoContext.getRequestMessage(messageId);
        if(gameMessage != null) {
        	gameMessage.read(byteBuf);
        } else {
        	logger.error("找不到messageId {} 对应的消息对象",messageId);
        }
        return gameMessage;
    }



}
